package org.idea.web.socket.mq;

import com.alibaba.fastjson.JSON;
import com.oracle.tools.packager.Log;
import com.qiyu.common.utils.HttpUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.idea.web.socket.manager.SocketManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;
import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;

/**
 * @Author linhao
 * @Date created in 10:59 上午 2021/5/10
 */
@Component
@Slf4j
public class MessageListenerHandler implements MessageListenerConcurrently {

    @Resource
    private SocketManager socketManager;
    @Resource
    private SimpMessagingTemplate template;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            Log.info("receive empty msg");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        byte[] bytes = messageExt.getBody();
        String json = new String(bytes);
        BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);
        log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);
        if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {

            log.info("[consumeMessage] 广播发送消息：触发----》消息内容为：" + broadcastMqDTO);
            template.convertAndSend("/topic/sendTopic", broadcastMqDTO);

        } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {

            String sessionKey = broadcastMqDTO.getSessionKey();
            WebSocketSession webSocketSession = socketManager.get(sessionKey);
            if (webSocketSession != null) {
                template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());
                log.info("[consumeMessage] 点对点发送消息；触发----》消息内容为：" + broadcastMqDTO);
            }

        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
